package com.starxg.m3u8.download;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.starxg.m3u8.config.ApplicationConfig;
import com.starxg.m3u8.parser.M3U8Item;
import com.starxg.m3u8.parser.M3U8ItemStatus;
import com.starxg.m3u8.utils.HttpCallback;
import com.starxg.m3u8.utils.HttpUtils;
import com.starxg.m3u8.utils.ThreadPoolUtils;

import javafx.application.Platform;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Request;
import okhttp3.Response;

/**
 * M3U8Download
 *
 * @author huangxingguang@lvmama.com
 * @date 2020-08-20 15:45
 */
@Slf4j
public class M3U8Download {
    private Collection<M3U8Item> items;
    private Future<Void> future;
    private M3U8DownloadCallback<M3U8Download> callback;
    private Semaphore semaphore;
    private CountDownLatch latch;
    private AtomicInteger succeeded;
    private File savePath;

    public M3U8Download(Collection<M3U8Item> items, File savePath, M3U8DownloadCallback<M3U8Download> callback) {
        this.callback = Objects.requireNonNull(callback);
        this.items = Collections.unmodifiableCollection(items);
        this.savePath = savePath;
    }

    @SuppressWarnings("unchecked")
    public M3U8Download start() {
        semaphore = new Semaphore(ApplicationConfig.getInstance().getApp().getHttpConnections());
        succeeded = new AtomicInteger();
        // 如果成功的过滤掉，其它的状态变成等待中
        items = items.stream().filter(e -> {
            // 下载成功 且 保存位置不为空 且 文件已存在
            if (e.getStatus() == M3U8ItemStatus.SUCCEEDED && e.getSavePath() != null && e.getSavePath().exists()) {
                succeeded.incrementAndGet();
                return false;
            }

            e.setStatus(M3U8ItemStatus.WAITING);
            return true;
        }).collect(Collectors.toList());
        latch = new CountDownLatch(items.size());
        callback.before(this);

        future = (Future<Void>) ThreadPoolUtils.submit(() -> {
            try {
                startDownloadItems();
                Platform.runLater(() -> callback.succeeded());
            } catch (InterruptedException ex) {
                Platform.runLater(() -> callback.stopped());
            } finally {
                Platform.runLater(() -> callback.completed());
            }
        });

        return this;
    }

    private void startDownloadItems() throws InterruptedException {
        int retryCount = ApplicationConfig.getInstance().getApp().getHttpRetryCount();
        for (M3U8Item item : items) {
            if (item.canDownload()) {
                downloadItem(item, retryCount);
            }
        }
        latch.await();
    }

    private void downloadItem(M3U8Item item, int retryCount) throws InterruptedException {

        semaphore.acquire();

        item.setStatus(M3U8ItemStatus.RUNNING);

        Request.Builder ok = HttpUtils.prepareGet(item.getUrl());
        ok.header("RANGE", "bytes=" + item.getRead() + "-");
        Call future = HttpUtils.enqueue(ok.build(), new HttpCallback() {

            @Override
            public void onFailure(Call call, Throwable e) {

                semaphore.release();

                if (call.isCanceled() || !item.canDownload()) {
                    return;
                }

                if (retryCount != 0) {
                    item.setStatus(M3U8ItemStatus.RETRYING);
                    log.error("分片 [{}] 下载失败。原因:{}，3 秒后重试。", item.getName(), e.getMessage(), e);
                    ThreadPoolUtils.submit(() -> {
                        if (item.canDownload()) {
                            try {
                                downloadItem(item, retryCount == -1 ? -1 : retryCount - 1);
                            } catch (InterruptedException ex) {
                                log.error(ex.getMessage(), ex);
                            }
                        }
                    }, 3, TimeUnit.SECONDS);
                    return;
                }

                latch.countDown();
                Platform.runLater(() -> callback.part(item, e));
            }

            @Override
            public void onResponse(Call call, Response response) throws Exception {

                write(item, call, response);

                succeeded.incrementAndGet();
                semaphore.release();
                latch.countDown();
                Platform.runLater(() -> callback.part(item));
            }

            @Override
            public void completed() {
            }
        });

        item.statusProperty().addListener(new M3U8ItemStatusListener(item, future));

    }

    private void write(M3U8Item item, Call call, Response response) throws IOException {
        File file = new File(savePath.getParent(), item.getName());
        try (var is = Objects.requireNonNull(response.body()).source(); var out = new RandomAccessFile(file, "rw")) {
            int len;
            byte[] bytes = new byte[1024 * 8];
            long total = item.getTotal() == 0 ? response.body().contentLength() : item.getTotal();
            item.setTotal(total);
            int read = item.getRead();
            out.seek(read);
            while ((len = is.read(bytes)) > -1) {
                out.write(bytes, 0, len);
                item.setRead(read += len);
                callback.process(item, total, read, read * 1.00 / total);
                if (call.isCanceled() || !item.canDownload()) {
                    return;
                }
            }
        }
        item.setSavePath(file);
    }

    /**
     * 暂停下载
     */
    public void stop() {
        log.info("暂停线程 {}", future.cancel(true));
    }

    public int getSucceeded() {
        return succeeded == null ? 0 : succeeded.get();
    }

    @AllArgsConstructor
    private static class M3U8ItemStatusListener implements ChangeListener<M3U8ItemStatus> {
        private M3U8Item item;
        private Call call;

        @Override
        public void changed(ObservableValue<? extends M3U8ItemStatus> observable, M3U8ItemStatus oldValue,
                M3U8ItemStatus newValue) {
            if (newValue == M3U8ItemStatus.PAUSED) {
                call.cancel();
                item.statusProperty().removeListener(this);
                log.info("[{}] 下载暂停", item.getName());
            }
        }
    }

}
